/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.end2end;

import static org.apache.phoenix.util.TestUtil.A_VALUE;
import static org.apache.phoenix.util.TestUtil.B_VALUE;
import static org.apache.phoenix.util.TestUtil.C_VALUE;
import static org.apache.phoenix.util.TestUtil.E_VALUE;
import static org.apache.phoenix.util.TestUtil.ROW1;
import static org.apache.phoenix.util.TestUtil.ROW2;
import static org.apache.phoenix.util.TestUtil.ROW3;
import static org.apache.phoenix.util.TestUtil.ROW4;
import static org.apache.phoenix.util.TestUtil.ROW5;
import static org.apache.phoenix.util.TestUtil.ROW6;
import static org.apache.phoenix.util.TestUtil.ROW7;
import static org.apache.phoenix.util.TestUtil.ROW8;
import static org.apache.phoenix.util.TestUtil.ROW9;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.sql.Array;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;

@Category(ParallelStatsDisabledTest.class)
@RunWith(Parameterized.class)
public class DerivedTableIT extends ParallelStatsDisabledIT {
  private static final String tenantId = getOrganizationId();
  private static final String dynamicTableName = "_TABLENAME_REPLACEABLE_";
  @Rule
  public TestName name = new TestName();

  private String[] indexDDL;
  private String[] plans;
  private String tableName;

  private static final Logger LOGGER = LoggerFactory.getLogger(DerivedTableIT.class);

  public DerivedTableIT(String[] indexDDL, String[] plans) {
    this.indexDDL = indexDDL;
    this.plans = plans;
  }

  @Before
  public void initTable() throws Exception {
    if (tableName != null) throw new RuntimeException("Test has not been cleaned up.");
    tableName = generateUniqueName();

    initATableValues(tableName, tenantId, getDefaultSplits(tenantId), null, null, getUrl(), null);
    if (indexDDL != null && indexDDL.length > 0) {
      Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
      Connection conn = DriverManager.getConnection(getUrl(), props);
      for (String ddl : indexDDL) {
        ddl = ddl.replace(dynamicTableName, tableName);
        conn.createStatement().execute(ddl);
      }
    }
    String[] newplan = new String[plans.length];
    if (plans != null && plans.length > 0) {
      for (int i = 0; i < plans.length; i++) {
        newplan[i] = plans[i].replace(dynamicTableName, tableName);
      }
      plans = newplan;
    }
  }

  @After
  public void cleanUp() throws Exception {
    boolean refCountLeaked = isAnyStoreRefCountLeaked();
    tableName = null;
    assertFalse("refCount leaked", refCountLeaked);
  }

  @Parameters(name = "DerivedTableIT_{index}") // name is used by failsafe as file name in reports
  public static synchronized Collection<Object> data() {
    List<Object> testCases = Lists.newArrayList();
    testCases.add(new String[][] {
      { "CREATE INDEX " + dynamicTableName + "_DERIVED_IDX ON " + dynamicTableName
        + " (a_byte) INCLUDE (A_STRING, B_STRING)" },
      { "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + dynamicTableName + "_DERIVED_IDX \n"
        + "    SERVER AGGREGATE INTO DISTINCT ROWS BY [\"A_STRING\", \"B_STRING\"]\n"
        + "CLIENT MERGE SORT\n" + "CLIENT SORTED BY [\"B_STRING\"]\n" + "CLIENT SORTED BY [A]\n"
        + "CLIENT AGGREGATE INTO DISTINCT ROWS BY [A]\n" + "CLIENT SORTED BY [A DESC]",

        "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + dynamicTableName + "_DERIVED_IDX \n"
          + "    SERVER AGGREGATE INTO DISTINCT ROWS BY [\"A_STRING\", \"B_STRING\"]\n"
          + "CLIENT MERGE SORT\n" + "CLIENT AGGREGATE INTO ORDERED DISTINCT ROWS BY [A]\n"
          + "CLIENT DISTINCT ON [COLLECTDISTINCT(B)]\n" + "CLIENT SORTED BY [A DESC]" } });
    testCases.add(new String[][] { {},
      { "CLIENT PARALLEL 4-WAY FULL SCAN OVER " + dynamicTableName + " \n"
        + "    SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING, B_STRING]\n"
        + "CLIENT MERGE SORT\n" + "CLIENT SORTED BY [B_STRING]\n" + "CLIENT SORTED BY [A]\n"
        + "CLIENT AGGREGATE INTO DISTINCT ROWS BY [A]\n" + "CLIENT SORTED BY [A DESC]",

        "CLIENT PARALLEL 4-WAY FULL SCAN OVER " + dynamicTableName + " \n"
          + "    SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING, B_STRING]\n"
          + "CLIENT MERGE SORT\n" + "CLIENT AGGREGATE INTO ORDERED DISTINCT ROWS BY [A]\n"
          + "CLIENT DISTINCT ON [COLLECTDISTINCT(B)]\n" + "CLIENT SORTED BY [A DESC]" } });
    return testCases;
  }

  @Test
  public void testDerivedTableWithWhere() throws Exception {
    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
    Connection conn = DriverManager.getConnection(getUrl(), props);
    try {
      // (where)
      String query =
        "SELECT t.eid, t.x + 9 FROM (SELECT entity_id eid, b_string b, a_byte + 1 x FROM "
          + tableName + " WHERE a_byte + 1 < 9) AS t";
      PreparedStatement statement = conn.prepareStatement(query);
      ResultSet rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(ROW1, rs.getString(1));
      assertEquals(11, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(ROW2, rs.getString(1));
      assertEquals(12, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(ROW3, rs.getString(1));
      assertEquals(13, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(ROW4, rs.getString(1));
      assertEquals(14, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(ROW5, rs.getString(1));
      assertEquals(15, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(ROW6, rs.getString(1));
      assertEquals(16, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(ROW7, rs.getString(1));
      assertEquals(17, rs.getInt(2));

      assertFalse(rs.next());

      // () where
      query = "SELECT t.eid, t.x + 9 FROM (SELECT entity_id eid, b_string b, a_byte + 1 x FROM "
        + tableName + ") AS t WHERE t.b = '" + C_VALUE + "'";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(ROW2, rs.getString(1));
      assertEquals(12, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(ROW5, rs.getString(1));
      assertEquals(15, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(ROW8, rs.getString(1));
      assertEquals(18, rs.getInt(2));

      assertFalse(rs.next());

      // (where) where
      query = "SELECT t.eid, t.x + 9 FROM (SELECT entity_id eid, b_string b, a_byte + 1 x FROM "
        + tableName + " WHERE a_byte + 1 < 9) AS t WHERE t.b = '" + C_VALUE + "'";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(ROW2, rs.getString(1));
      assertEquals(12, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(ROW5, rs.getString(1));
      assertEquals(15, rs.getInt(2));

      assertFalse(rs.next());

      // (groupby where) where
      query = "SELECT t.a, t.c, t.m FROM (SELECT a_string a, count(*) c, max(a_byte) m FROM "
        + tableName + " WHERE a_byte != 8 GROUP BY a_string) AS t WHERE t.c > 1";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(A_VALUE, rs.getString(1));
      assertEquals(4, rs.getInt(2));
      assertEquals(4, rs.getInt(3));
      assertTrue(rs.next());
      assertEquals(B_VALUE, rs.getString(1));
      assertEquals(3, rs.getInt(2));
      assertEquals(7, rs.getInt(3));

      assertFalse(rs.next());

      // (groupby having where) where
      query =
        "SELECT t.a, t.c, t.m FROM (SELECT a_string a, count(*) c, max(a_byte) m FROM " + tableName
          + " WHERE a_byte != 8 GROUP BY a_string HAVING count(*) >= 2) AS t WHERE t.a != '"
          + A_VALUE + "'";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(B_VALUE, rs.getString(1));
      assertEquals(3, rs.getInt(2));
      assertEquals(7, rs.getInt(3));

      assertFalse(rs.next());

      // (limit) where
      query = "SELECT t.eid FROM (SELECT entity_id eid, b_string b FROM " + tableName
        + " LIMIT 2) AS t WHERE t.b = '" + C_VALUE + "'";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(ROW2, rs.getString(1));

      assertFalse(rs.next());

      // ((where limit) where limit) limit
      query = "SELECT u.eid FROM (SELECT t.eid FROM (SELECT entity_id eid, b_string b FROM "
        + tableName + " WHERE a_string = '" + B_VALUE + "' LIMIT 5) AS t WHERE t.b = '" + C_VALUE
        + "' LIMIT 4) AS u WHERE u.eid >= '" + ROW1 + "' LIMIT 3";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(ROW5, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW8, rs.getString(1));

      assertFalse(rs.next());

      // (count) where
      query = "SELECT t.c FROM (SELECT count(*) c FROM " + tableName + ") AS t WHERE t.c > 0";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(9, rs.getInt(1));

      assertFalse(rs.next());

      // Inner limit < outer query offset
      query = "SELECT t.eid, t.x + 9 FROM (SELECT entity_id eid, b_string b, a_byte + 1 x FROM "
        + tableName + " LIMIT 1 OFFSET 1 ) AS t WHERE t.b = '" + C_VALUE + "' OFFSET 2";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertFalse(rs.next());

      // (where) offset
      query = "SELECT t.eid, t.x + 9 FROM (SELECT entity_id eid, b_string b, a_byte + 1 x FROM "
        + tableName + " WHERE a_byte + 1 < 9 ) AS t OFFSET 2";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(ROW3, rs.getString(1));
      assertEquals(13, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(ROW4, rs.getString(1));
      assertEquals(14, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(ROW5, rs.getString(1));
      assertEquals(15, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(ROW6, rs.getString(1));
      assertEquals(16, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(ROW7, rs.getString(1));
      assertEquals(17, rs.getInt(2));

      // (offset) where
      query = "SELECT t.eid, t.x + 9 FROM (SELECT entity_id eid, b_string b, a_byte + 1 x FROM "
        + tableName + " OFFSET 4) AS t WHERE t.b = '" + C_VALUE + "'";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(ROW5, rs.getString(1));
      assertEquals(15, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(ROW8, rs.getString(1));
      assertEquals(18, rs.getInt(2));

    } finally {
      conn.close();
    }
  }

  @Test
  public void testDerivedTableWithGroupBy() throws Exception {
    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
    Connection conn = DriverManager.getConnection(getUrl(), props);
    try {
      // () groupby having
      String query = "SELECT t.a, count(*), max(t.s) FROM (SELECT a_string a, a_byte s FROM "
        + tableName + " WHERE a_byte != 8) AS t GROUP BY t.a HAVING count(*) > 1";
      PreparedStatement statement = conn.prepareStatement(query);
      ResultSet rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(A_VALUE, rs.getString(1));
      assertEquals(4, rs.getInt(2));
      assertEquals(4, rs.getInt(3));
      assertTrue(rs.next());
      assertEquals(B_VALUE, rs.getString(1));
      assertEquals(3, rs.getInt(2));
      assertEquals(7, rs.getInt(3));

      assertFalse(rs.next());

      // (groupby) groupby
      query = "SELECT t.c, count(*) FROM (SELECT count(*) c FROM " + tableName
        + " GROUP BY a_string) AS t GROUP BY t.c";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(1, rs.getInt(1));
      assertEquals(1, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(4, rs.getInt(1));
      assertEquals(2, rs.getInt(2));

      assertFalse(rs.next());

      // (groupby) groupby orderby
      query = "SELECT t.c, count(*) FROM (SELECT count(*) c FROM " + tableName
        + " GROUP BY a_string) AS t GROUP BY t.c ORDER BY count(*) DESC";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(4, rs.getInt(1));
      assertEquals(2, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(1, rs.getInt(1));
      assertEquals(1, rs.getInt(2));

      assertFalse(rs.next());

      // (groupby a, b orderby b) groupby a orderby a
      query =
        "SELECT t.a, COLLECTDISTINCT(t.b) FROM (SELECT b_string b, a_string a FROM " + tableName
          + " GROUP BY a_string, b_string ORDER BY b_string) AS t GROUP BY t.a ORDER BY t.a DESC";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(C_VALUE, rs.getString(1));
      String[] b = new String[1];
      b[0] = E_VALUE;
      Array array = conn.createArrayOf("VARCHAR", b);
      assertEquals(array, rs.getArray(2));
      assertTrue(rs.next());
      assertEquals(B_VALUE, rs.getString(1));
      b = new String[3];
      b[0] = B_VALUE;
      b[1] = C_VALUE;
      b[2] = E_VALUE;
      array = conn.createArrayOf("VARCHAR", b);
      assertEquals(array, rs.getArray(2));
      assertTrue(rs.next());
      assertEquals(A_VALUE, rs.getString(1));
      assertEquals(array, rs.getArray(2));

      assertFalse(rs.next());

      rs = conn.createStatement().executeQuery("EXPLAIN WITH REGIONS " + query);
      String explainPlanOutput = QueryUtil.getExplainPlan(rs);
      LOGGER.info("Explain plan output: {}", explainPlanOutput);
      String[] splitExplainPlan = explainPlanOutput.split("\\n \\(region locations = \\[region=");
      String[] secondSplitExplainPlan = splitExplainPlan[1].split("]\\)");
      assertEquals(plans[0], splitExplainPlan[0] + secondSplitExplainPlan[1]);

      // distinct b (groupby a, b) groupby a orderby a
      query = "SELECT DISTINCT COLLECTDISTINCT(t.b) FROM (SELECT b_string b, a_string a FROM "
        + tableName + " GROUP BY a_string, b_string) AS t GROUP BY t.a ORDER BY t.a DESC";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      b = new String[1];
      b[0] = E_VALUE;
      array = conn.createArrayOf("VARCHAR", b);
      assertEquals(array, rs.getArray(1));
      assertTrue(rs.next());
      b = new String[3];
      b[0] = B_VALUE;
      b[1] = C_VALUE;
      b[2] = E_VALUE;
      array = conn.createArrayOf("VARCHAR", b);
      assertEquals(array, rs.getArray(1));

      assertFalse(rs.next());

      rs = conn.createStatement().executeQuery("EXPLAIN WITH REGIONS " + query);
      explainPlanOutput = QueryUtil.getExplainPlan(rs);
      LOGGER.info("Explain plan output: {}", explainPlanOutput);
      splitExplainPlan = explainPlanOutput.split("\\n \\(region locations = \\[region=");
      secondSplitExplainPlan = splitExplainPlan[1].split("]\\)");
      assertEquals(plans[1], splitExplainPlan[0] + secondSplitExplainPlan[1]);

      // (orderby) groupby
      query = "SELECT t.a_string, count(*) FROM (SELECT * FROM " + tableName
        + " order by a_integer) AS t where a_byte != 8 group by t.a_string";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(A_VALUE, rs.getString(1));
      assertEquals(4, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(B_VALUE, rs.getString(1));
      assertEquals(3, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(C_VALUE, rs.getString(1));
      assertEquals(1, rs.getInt(2));

      assertFalse(rs.next());

      // (groupby) groupby orderby offset
      query = "SELECT t.c, count(*) FROM (SELECT count(*) c FROM " + tableName
        + " GROUP BY a_string) AS t GROUP BY t.c ORDER BY count(*) DESC OFFSET 1";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(1, rs.getInt(1));
      assertEquals(1, rs.getInt(2));

      assertFalse(rs.next());

    } finally {
      conn.close();
    }
  }

  @Test
  public void testDerivedTableWithOrderBy() throws Exception {
    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
    Connection conn = DriverManager.getConnection(getUrl(), props);
    try {
      // (orderby)
      String query = "SELECT t.eid FROM (SELECT entity_id eid, b_string b FROM " + tableName
        + " ORDER BY b, eid) AS t";
      PreparedStatement statement = conn.prepareStatement(query);
      ResultSet rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(ROW1, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW4, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW7, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW2, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW5, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW8, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW3, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW6, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW9, rs.getString(1));

      assertFalse(rs.next());

      // () orderby
      query = "SELECT t.eid FROM (SELECT entity_id eid, b_string b FROM " + tableName
        + ") AS t ORDER BY t.b, t.eid";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(ROW1, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW4, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW7, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW2, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW5, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW8, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW3, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW6, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW9, rs.getString(1));

      assertFalse(rs.next());

      // (orderby) orderby
      query = "SELECT t.eid FROM (SELECT entity_id eid, b_string b FROM " + tableName
        + " ORDER BY b, eid) AS t ORDER BY t.b DESC, t.eid DESC";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(ROW9, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW6, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW3, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW8, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW5, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW2, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW7, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW4, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW1, rs.getString(1));

      assertFalse(rs.next());

      // (limit) orderby
      query = "SELECT t.eid FROM (SELECT entity_id eid, b_string b FROM " + tableName
        + " LIMIT 2) AS t ORDER BY t.b DESC, t.eid";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(ROW2, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW1, rs.getString(1));

      assertFalse(rs.next());
    } finally {
      conn.close();
    }
  }

  @Test
  public void testDerivedTableWithLimit() throws Exception {
    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
    Connection conn = DriverManager.getConnection(getUrl(), props);
    try {
      // (limit)
      String query = "SELECT t.eid FROM (SELECT entity_id eid FROM " + tableName + " LIMIT 2) AS t";
      PreparedStatement statement = conn.prepareStatement(query);
      ResultSet rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(ROW1, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW2, rs.getString(1));

      assertFalse(rs.next());

      // () limit
      query = "SELECT t.eid FROM (SELECT entity_id eid FROM " + tableName + ") AS t LIMIT 2";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(ROW1, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW2, rs.getString(1));

      assertFalse(rs.next());

      // (limit 2) limit 4
      query =
        "SELECT t.eid FROM (SELECT entity_id eid FROM " + tableName + " LIMIT 2) AS t LIMIT 4";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(ROW1, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW2, rs.getString(1));

      assertFalse(rs.next());

      // (limit 4) limit 2
      query =
        "SELECT t.eid FROM (SELECT entity_id eid FROM " + tableName + " LIMIT 4) AS t LIMIT 2";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(ROW1, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW2, rs.getString(1));

      assertFalse(rs.next());

      // limit ? limit ?
      query =
        "SELECT t.eid FROM (SELECT entity_id eid FROM " + tableName + " LIMIT ?) AS t LIMIT ?";
      statement = conn.prepareStatement(query);
      statement.setInt(1, 4);
      statement.setInt(2, 2);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(ROW1, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW2, rs.getString(1));

      assertFalse(rs.next());

      // (groupby orderby) limit
      query = "SELECT a, s FROM (SELECT a_string a, sum(a_byte) s FROM " + tableName
        + " GROUP BY a_string ORDER BY sum(a_byte)) LIMIT 2";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(C_VALUE, rs.getString(1));
      assertEquals(9, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(A_VALUE, rs.getString(1));
      assertEquals(10, rs.getInt(2));

      assertFalse(rs.next());

      // (union) groupby limit
      query = "SELECT a_string, count(*) FROM (SELECT a_string FROM " + tableName
        + " where a_byte < 4 union all SELECT a_string FROM " + tableName
        + " where a_byte > 8) group by a_string limit 2";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(A_VALUE, rs.getString(1));
      assertEquals(3, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(C_VALUE, rs.getString(1));
      assertEquals(1, rs.getInt(2));

      assertFalse(rs.next());
    } finally {
      conn.close();
    }
  }

  @Test
  public void testDerivedTableWithOffset() throws Exception {
    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
    Connection conn = DriverManager.getConnection(getUrl(), props);
    try {
      // (LIMIT OFFSET )
      String query =
        "SELECT t.eid FROM (SELECT entity_id eid FROM " + tableName + " LIMIT 2 OFFSET 1) AS t";
      PreparedStatement statement = conn.prepareStatement(query);
      ResultSet rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(ROW2, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW3, rs.getString(1));

      assertFalse(rs.next());

      // (OFFSET) limit
      query =
        "SELECT t.eid FROM (SELECT entity_id eid FROM " + tableName + " OFFSET 1) AS t LIMIT 2";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(ROW2, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW3, rs.getString(1));

      assertFalse(rs.next());

      // (limit OFFSET) limit OFFSET
      query = "SELECT t.eid FROM (SELECT entity_id eid FROM " + tableName
        + " LIMIT 2 OFFSET 1) AS t LIMIT 4 OFFSET 1";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(ROW3, rs.getString(1));
      assertFalse(rs.next());

      // (limit OFFSET) limit 2
      query = "SELECT t.eid FROM (SELECT entity_id eid FROM " + tableName
        + " LIMIT 4 OFFSET 1) AS t LIMIT 2";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(ROW2, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW3, rs.getString(1));

      assertFalse(rs.next());

      // (limit ? OFFSET ?) limit ? OFFSET ?
      query = "SELECT t.eid FROM (SELECT entity_id eid FROM " + tableName
        + " LIMIT ? OFFSET ?) AS t LIMIT ? OFFSET ?";
      statement = conn.prepareStatement(query);
      statement.setInt(1, 4);
      statement.setInt(2, 2);
      statement.setInt(3, 2);
      statement.setInt(4, 2);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(ROW5, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW6, rs.getString(1));
      assertFalse(rs.next());

      // (groupby orderby OFFSET)
      query = "SELECT a, s FROM (SELECT a_string a, sum(a_byte) s FROM " + tableName
        + " GROUP BY a_string ORDER BY sum(a_byte) OFFSET 1)";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(A_VALUE, rs.getString(1));
      assertEquals(10, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(B_VALUE, rs.getString(1));
      assertEquals(26, rs.getInt(2));

      assertFalse(rs.next());

      // (union OFFSET) groupby
      query = "SELECT a_string, count(*) FROM (SELECT a_string FROM " + tableName
        + " where a_byte < 4 union all SELECT a_string FROM " + tableName
        + " where a_byte > 8 OFFSET 1) group by a_string";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(A_VALUE, rs.getString(1));
      assertEquals(2, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(C_VALUE, rs.getString(1));
      assertEquals(1, rs.getInt(2));
      assertFalse(rs.next());
    } finally {
      conn.close();
    }
  }

  @Test
  public void testDerivedTableWithDistinct() throws Exception {
    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
    Connection conn = DriverManager.getConnection(getUrl(), props);
    try {
      // (distinct)
      String query = "SELECT * FROM (SELECT DISTINCT a_string, b_string FROM " + tableName
        + ") AS t WHERE t.b_string != '" + C_VALUE + "' ORDER BY t.b_string, t.a_string";
      PreparedStatement statement = conn.prepareStatement(query);
      ResultSet rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(A_VALUE, rs.getString(1));
      assertEquals(B_VALUE, rs.getString(2));
      assertTrue(rs.next());
      assertEquals(B_VALUE, rs.getString(1));
      assertEquals(B_VALUE, rs.getString(2));
      assertTrue(rs.next());
      assertEquals(A_VALUE, rs.getString(1));
      assertEquals(E_VALUE, rs.getString(2));
      assertTrue(rs.next());
      assertEquals(B_VALUE, rs.getString(1));
      assertEquals(E_VALUE, rs.getString(2));
      assertTrue(rs.next());
      assertEquals(C_VALUE, rs.getString(1));
      assertEquals(E_VALUE, rs.getString(2));

      assertFalse(rs.next());

      // distinct ()
      query = "SELECT DISTINCT t.a, t.b FROM (SELECT a_string a, b_string b FROM " + tableName
        + ") AS t WHERE t.b != '" + C_VALUE + "' ORDER BY t.b, t.a";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(A_VALUE, rs.getString(1));
      assertEquals(B_VALUE, rs.getString(2));
      assertTrue(rs.next());
      assertEquals(B_VALUE, rs.getString(1));
      assertEquals(B_VALUE, rs.getString(2));
      assertTrue(rs.next());
      assertEquals(A_VALUE, rs.getString(1));
      assertEquals(E_VALUE, rs.getString(2));
      assertTrue(rs.next());
      assertEquals(B_VALUE, rs.getString(1));
      assertEquals(E_VALUE, rs.getString(2));
      assertTrue(rs.next());
      assertEquals(C_VALUE, rs.getString(1));
      assertEquals(E_VALUE, rs.getString(2));

      assertFalse(rs.next());

      // distinct (distinct)
      query = "SELECT DISTINCT t.a FROM (SELECT DISTINCT a_string a, b_string b FROM " + tableName
        + ") AS t";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(A_VALUE, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(B_VALUE, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(C_VALUE, rs.getString(1));

      assertFalse(rs.next());

      // distinct (groupby)
      query = "SELECT distinct t.c FROM (SELECT count(*) c FROM " + tableName
        + " GROUP BY a_string) AS t";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(1, rs.getInt(1));
      assertTrue(rs.next());
      assertEquals(4, rs.getInt(1));

      assertFalse(rs.next());

      // distinct (groupby) orderby
      query = "SELECT distinct t.c FROM (SELECT count(*) c FROM " + tableName
        + " GROUP BY a_string) AS t ORDER BY t.c DESC";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(4, rs.getInt(1));
      assertTrue(rs.next());
      assertEquals(1, rs.getInt(1));

      assertFalse(rs.next());

      // distinct (limit)
      query = "SELECT DISTINCT t.a, t.b FROM (SELECT a_string a, b_string b FROM " + tableName
        + " LIMIT 2) AS t";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(A_VALUE, rs.getString(1));
      assertEquals(B_VALUE, rs.getString(2));
      assertTrue(rs.next());
      assertEquals(A_VALUE, rs.getString(1));
      assertEquals(C_VALUE, rs.getString(2));

      assertFalse(rs.next());
    } finally {
      conn.close();
    }
  }

  @Test
  public void testDerivedTableWithAggregate() throws Exception {
    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
    Connection conn = DriverManager.getConnection(getUrl(), props);
    try {
      // (count)
      String query =
        "SELECT * FROM (SELECT count(*) FROM " + tableName + " WHERE a_byte != 8) AS t";
      PreparedStatement statement = conn.prepareStatement(query);
      ResultSet rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(8, rs.getInt(1));

      assertFalse(rs.next());

      // count ()
      query =
        "SELECT count(*) FROM (SELECT a_byte FROM " + tableName + ") AS t WHERE t.a_byte != 8";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(8, rs.getInt(1));

      assertFalse(rs.next());

      // count (distinct)
      query = "SELECT count(*) FROM (SELECT DISTINCT a_string FROM " + tableName + ") AS t";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(3, rs.getInt(1));

      assertFalse(rs.next());

      // count (groupby)
      query =
        "SELECT count(*) FROM (SELECT count(*) c FROM " + tableName + " GROUP BY a_string) AS t";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(3, rs.getInt(1));

      assertFalse(rs.next());

      // count (limit)
      query = "SELECT count(*) FROM (SELECT entity_id FROM " + tableName + " LIMIT 2) AS t";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(2, rs.getInt(1));

      assertFalse(rs.next());

      // count (subquery)
      query = "SELECT count(*) FROM (SELECT * FROM " + tableName
        + " WHERE (organization_id, entity_id) in (SELECT organization_id, entity_id FROM "
        + tableName + " WHERE a_byte != 8)) AS t";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(8, rs.getInt(1));

      assertFalse(rs.next());

      // count (orderby)
      query = "SELECT count(a_byte) FROM (SELECT * FROM " + tableName
        + " order by a_integer) AS t where a_byte != 8";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(8, rs.getInt(1));

      assertFalse(rs.next());
    } finally {
      conn.close();
    }
  }

  @Test
  public void testDerivedTableWithJoin() throws Exception {
    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
    Connection conn = DriverManager.getConnection(getUrl(), props);
    try {
      // groupby (join)
      String query =
        "SELECT q.id1, count(q.id2) FROM (SELECT t1.entity_id id1, t2.entity_id id2, t2.a_byte b2"
          + " FROM " + tableName + " t1 JOIN " + tableName + " t2 ON t1.a_string = t2.b_string"
          + " WHERE t1.a_byte >= 8) AS q WHERE q.b2 != 5 GROUP BY q.id1";
      PreparedStatement statement = conn.prepareStatement(query);
      ResultSet rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(ROW8, rs.getString(1));
      assertEquals(3, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(ROW9, rs.getString(1));
      assertEquals(2, rs.getInt(2));

      assertFalse(rs.next());

      // distinct (join)
      query = "SELECT DISTINCT q.id1 FROM (SELECT t1.entity_id id1, t2.a_byte b2" + " FROM "
        + tableName + " t1 JOIN " + tableName + " t2 ON t1.a_string = t2.b_string"
        + " WHERE t1.a_byte >= 8) AS q WHERE q.b2 != 5";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(ROW8, rs.getString(1));
      assertTrue(rs.next());
      assertEquals(ROW9, rs.getString(1));

      assertFalse(rs.next());

      // count (join)
      query =
        "SELECT COUNT(*) FROM (SELECT t2.a_byte b2" + " FROM " + tableName + " t1 JOIN " + tableName
          + " t2 ON t1.a_string = t2.b_string" + " WHERE t1.a_byte >= 8) AS q WHERE q.b2 != 5";
      statement = conn.prepareStatement(query);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(5, rs.getInt(1));

      assertFalse(rs.next());
    } finally {
      conn.close();
    }
  }

  @Test
  public void testNestedDerivedTable() throws Exception {
    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
    Connection conn = DriverManager.getConnection(getUrl(), props);
    try {
      // testNestedDerivedTable require index with same name be created
      String ddl = "CREATE INDEX IF NOT EXISTS " + tableName + "_DERIVED_IDX ON " + tableName
        + " (a_byte) INCLUDE (A_STRING, B_STRING)";
      conn.createStatement().execute(ddl);

      // select(select(select))
      String query =
        "SELECT q.id, q.x10 * 10 FROM (SELECT t.eid id, t.x + 9 x10, t.astr a, t.bstr b FROM (SELECT entity_id eid, a_string astr, b_string bstr, a_byte + 1 x FROM "
          + tableName
          + " WHERE a_byte + 1 < ?) AS t ORDER BY b, id) AS q WHERE q.a = ? OR q.b = ? OR q.b = ?";
      PreparedStatement statement = conn.prepareStatement(query);
      statement.setInt(1, 9);
      statement.setString(2, A_VALUE);
      statement.setString(3, C_VALUE);
      statement.setString(4, E_VALUE);
      ResultSet rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(ROW1, rs.getString(1));
      assertEquals(110, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(ROW4, rs.getString(1));
      assertEquals(140, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(ROW2, rs.getString(1));
      assertEquals(120, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(ROW5, rs.getString(1));
      assertEquals(150, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(ROW3, rs.getString(1));
      assertEquals(130, rs.getInt(2));
      assertTrue(rs.next());
      assertEquals(ROW6, rs.getString(1));
      assertEquals(160, rs.getInt(2));

      assertFalse(rs.next());

      // select(select(select) join (select(select)))
      query =
        "SELECT q1.id, q2.id FROM (SELECT t.eid id, t.astr a, t.bstr b FROM (SELECT entity_id eid, a_string astr, b_string bstr, a_byte abyte FROM "
          + tableName + ") AS t WHERE t.abyte >= ?) AS q1"
          + " JOIN (SELECT t.eid id, t.astr a, t.bstr b, t.abyte x FROM (SELECT entity_id eid, a_string astr, b_string bstr, a_byte abyte FROM "
          + tableName + ") AS t) AS q2 ON q1.a = q2.b"
          + " WHERE q2.x != ? ORDER BY q1.id, q2.id DESC";
      statement = conn.prepareStatement(query);
      statement.setInt(1, 8);
      statement.setInt(2, 5);
      rs = statement.executeQuery();
      assertTrue(rs.next());
      assertEquals(ROW8, rs.getString(1));
      assertEquals(ROW7, rs.getString(2));
      assertTrue(rs.next());
      assertEquals(ROW8, rs.getString(1));
      assertEquals(ROW4, rs.getString(2));
      assertTrue(rs.next());
      assertEquals(ROW8, rs.getString(1));
      assertEquals(ROW1, rs.getString(2));
      assertTrue(rs.next());
      assertEquals(ROW9, rs.getString(1));
      assertEquals(ROW8, rs.getString(2));
      assertTrue(rs.next());
      assertEquals(ROW9, rs.getString(1));
      assertEquals(ROW2, rs.getString(2));

      assertFalse(rs.next());
    } finally {
      conn.close();
    }
  }
}
